Source code for momepy.elements

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# elements.py
# generating derived elements (street edge, block)
import operator

import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
import libpysal
from scipy.spatial import Voronoi
from shapely.geometry import MultiPolygon, Point, Polygon
from shapely.wkt import loads
from tqdm import tqdm

# nothing - temporary solution for readthedocs fail. - cannot mock osgeo
try:
    from osgeo import ogr
except ModuleNotFoundError:
    print("rtd")

__all__ = ["buffered_limit", "Tessellation", "Blocks", "get_network_id", "get_node_id"]


[docs]def buffered_limit(gdf, buffer=100): """ Define limit for tessellation as a buffer around buildings. Parameters ---------- gdf : GeoDataFrame GeoDataFrame containing building footprints buffer : float buffer around buildings limiting the extend of tessellation Returns ------- MultiPolygon MultiPolygon or Polygon defining the study area Examples -------- >>> limit = mm.buffered_limit(buildings_df) >>> type(limit) shapely.geometry.polygon.Polygon """ study_area = gdf.copy() study_area["geometry"] = study_area.buffer(buffer) built_up = study_area.geometry.unary_union return built_up
[docs]class Tessellation: """ Generate morphological tessellation around given buildings or proximity bands around given street network. Parameters ---------- gdf : GeoDataFrame GeoDataFrame containing building footprints or street network unique_id : str name of the column with unique id limit : MultiPolygon or Polygon MultiPolygon or Polygon defining the study area limiting tessellation (otherwise it could go to infinity). shrink : float (default 0.4) distance for negative buffer to generate space between adjacent polygons (if geometry type of gdf is (Multi)Polygon). segment : float (default 0.5) maximum distance between points after discretisation Attributes ---------- tessellation : GeoDataFrame GeoDataFrame containing resulting tessellation gdf : GeoDataFrame original GeoDataFrame id : Series Series containing used unique ID limit : MultiPolygon or Polygon limit shrink : float used shrink value segment : float used segment value sindex : rtree spatial index spatial index of tessellation Examples -------- >>> tess = mm.Tessellation(buildings_df, 'uID', limit=mm.buffered_limit(buildings_df)) Inward offset... Discretization... Generating input point array... 100%|██████████| 144/144 [00:00<00:00, 376.15it/s] Generating Voronoi diagram... Generating GeoDataFrame... Vertices to Polygons: 100%|██████████| 33059/33059 [00:01<00:00, 31532.72it/s] Dissolving Voronoi polygons... Preparing buffer zone for edge resolving... Building R-tree... 100%|██████████| 42/42 [00:00<00:00, 752.54it/s] Cutting... >>> tess.tessellation.head() uID geometry 0 1 POLYGON ((1603586.677274485 6464344.667944215,... 1 2 POLYGON ((1603048.399497852 6464176.180701573,... 2 3 POLYGON ((1603071.342637536 6464158.863329805,... 3 4 POLYGON ((1603055.834005827 6464093.614718676,... 4 5 POLYGON ((1603106.417554705 6464130.215958447,... Notes ------- queen_corners is currently experimental method only and can cause errors. """
[docs] def __init__(self, gdf, unique_id, limit, shrink=0.4, segment=0.5): self.gdf = gdf self.id = gdf[unique_id] self.limit = limit self.shrink = shrink self.segment = segment objects = gdf.copy() centre = self._get_centre(objects) objects["geometry"] = objects["geometry"].translate( xoff=-centre[0], yoff=-centre[1] ) polys = ["Polygon", "MultiPolygon"] print("Inward offset...") objects["geometry"] = objects.geometry.apply( lambda g: g.buffer(-shrink, cap_style=2, join_style=2) if g.type in polys else g ) objects = objects.explode() objects.reset_index(inplace=True, drop=True) print("Discretization...") objects["geometry"] = objects["geometry"].apply(self._densify, segment=segment) print("Generating input point array...") points, ids = self._point_array(objects, unique_id) # add convex hull buffered large distance to eliminate infinity issues series = gpd.GeoSeries(limit, crs=gdf.crs).translate( xoff=-centre[0], yoff=-centre[1] ) hull = series.geometry[0].convex_hull.buffer(300) hull = self._densify(hull, 20) hull_array = np.array(hull.boundary.coords).tolist() for i, a in enumerate(hull_array): points.append(hull_array[i]) ids.append(-1) print("Generating Voronoi diagram...") voronoi_diagram = Voronoi(np.array(points)) print("Generating GeoDataFrame...") regions_gdf = self._regions(voronoi_diagram, unique_id, ids, crs=gdf.crs) print("Dissolving Voronoi polygons...") morphological_tessellation = regions_gdf[[unique_id, "geometry"]].dissolve( by=unique_id, as_index=False ) morphological_tessellation["geometry"] = morphological_tessellation[ "geometry" ].translate(xoff=centre[0], yoff=centre[1]) morphological_tessellation, sindex = self._cut( morphological_tessellation, limit, unique_id ) self.sindex = sindex self._check_result(morphological_tessellation, gdf, unique_id=unique_id) self.tessellation = morphological_tessellation
def queen_corners(self, sensitivity): """ Experimental: Fix unprecise corners. """ tessellation = self.tessellation.copy() changes = {} qid = 0 for ix, row in tqdm(tessellation.iterrows(), total=tessellation.shape[0]): corners = [] change = [] cell = row.geometry coords = cell.exterior.coords for i in coords: point = Point(i) possible_matches_index = list(self.sindex.intersection(point.bounds)) possible_matches = tessellation.iloc[possible_matches_index] precise_matches = sum(possible_matches.intersects(point)) if precise_matches > 2: corners.append(point) if len(corners) > 2: for c, it in enumerate(corners): next_c = c + 1 if c == (len(corners) - 1): next_c = 0 if corners[c].distance(corners[next_c]) < sensitivity: change.append([corners[c], corners[next_c]]) elif len(corners) == 2: if corners[0].distance(corners[1]) > 0: if corners[0].distance(corners[1]) < sensitivity: change.append([corners[0], corners[1]]) if change: for points in change: x_new = np.mean([points[0].x, points[1].x]) y_new = np.mean([points[0].y, points[1].y]) new = [(x_new, y_new), id] changes[(points[0].x, points[0].y)] = new changes[(points[1].x, points[1].y)] = new qid = qid + 1 for ix, row in tqdm(tessellation.iterrows(), total=tessellation.shape[0]): cell = row.geometry coords = list(cell.exterior.coords) moves = {} for x in coords: if x in changes.keys(): moves[coords.index(x)] = changes[x] keys = list(moves.keys()) delete_points = [] for move, k in enumerate(keys): if move < len(keys) - 1: if ( moves[keys[move]][1] == moves[keys[move + 1]][1] and keys[move + 1] - keys[move] < 5 ): delete_points = delete_points + ( coords[keys[move] : keys[move + 1]] ) # change the code above to have if based on distance not number newcoords = [changes[x][0] if x in changes.keys() else x for x in coords] for coord in newcoords: if coord in delete_points: newcoords.remove(coord) if coords != newcoords: if not cell.interiors: # newgeom = Polygon(newcoords).buffer(0) be = Polygon(newcoords).exterior mls = be.intersection(be) if len(list(shapely.ops.polygonize(mls))) > 1: newgeom = MultiPolygon(shapely.ops.polygonize(mls)) geoms = [] for g, n in enumerate(newgeom): geoms.append(newgeom[g].area) newgeom = newgeom[geoms.index(max(geoms))] else: newgeom = list(shapely.ops.polygonize(mls))[0] else: newgeom = Polygon(newcoords, holes=cell.interiors) tessellation.loc[ix, "geometry"] = newgeom return tessellation def _get_centre(self, gdf): """ Returns centre coords of gdf. """ bounds = gdf["geometry"].bounds centre_x = (bounds["maxx"].max() + bounds["minx"].min()) / 2 centre_y = (bounds["maxy"].max() + bounds["miny"].min()) / 2 return centre_x, centre_y # densify geometry before Voronoi tesselation def _densify(self, geom, segment): """ Returns densified geoemtry with segments no longer than `segment`. """ poly = geom wkt = geom.wkt # shapely Polygon to wkt geom = ogr.CreateGeometryFromWkt(wkt) # create ogr geometry geom.Segmentize(segment) # densify geometry by 2 metres geom.CloseRings() # fix for GDAL 2.4.1 bug wkt2 = geom.ExportToWkt() # ogr geometry to wkt try: new = loads(wkt2) # wkt to shapely Polygon return new except Exception: return poly def _point_array(self, objects, unique_id): """ Returns lists of points and ids based on geometry and unique_id. """ points = [] ids = [] for idx, row in tqdm(objects.iterrows(), total=objects.shape[0]): if row["geometry"].type in ["Polygon", "MultiPolygon"]: poly_ext = row["geometry"].boundary else: poly_ext = row["geometry"] if poly_ext is not None: if poly_ext.type == "MultiLineString": for line in poly_ext: point_coords = line.coords row_array = np.array(point_coords[:-1]).tolist() for i, a in enumerate(row_array): points.append(row_array[i]) ids.append(row[unique_id]) elif poly_ext.type == "LineString": point_coords = poly_ext.coords row_array = np.array(point_coords[:-1]).tolist() for i, a in enumerate(row_array): points.append(row_array[i]) ids.append(row[unique_id]) else: raise Exception("Boundary type is {}".format(poly_ext.type)) return points, ids def _regions(self, voronoi_diagram, unique_id, ids, crs): """ Generate GeoDataFrame of Voronoi regions from scipy.spatial.Voronoi. """ # generate DataFrame of results regions = pd.DataFrame() regions[unique_id] = ids # add unique id regions["region"] = voronoi_diagram.point_region # add region id for each point # add vertices of each polygon vertices = [] for region in regions.region: vertices.append(voronoi_diagram.regions[region]) regions["vertices"] = vertices # convert vertices to Polygons polygons = [] for region in tqdm(regions.vertices, desc="Vertices to Polygons"): if -1 not in region: polygons.append(Polygon(voronoi_diagram.vertices[region])) else: polygons.append(None) # save polygons as geometry column regions["geometry"] = polygons # generate GeoDataFrame regions_gdf = gpd.GeoDataFrame(regions.dropna(), geometry="geometry") regions_gdf = regions_gdf.loc[ regions_gdf["geometry"].length < 1000000 ] # delete errors regions_gdf = regions_gdf.loc[ regions_gdf[unique_id] != -1 ] # delete hull-based cells regions_gdf.crs = crs return regions_gdf def _cut(self, tessellation, limit, unique_id): """ Cut tessellation by the limit (Multi)Polygon. ADD: add option to delete everything outside of limit. Now it keeps it. """ print("Preparing limit for edge resolving...") geometry_cut = _split_lines(limit, 100) print("Building R-tree...") sindex = tessellation.sindex # find the points that intersect with each subpolygon and add them to points_within_geometry print("Identifying edge cells...") to_cut = pd.DataFrame() for poly in tqdm(geometry_cut, total=(len(geometry_cut))): # find approximate matches with r-tree, then precise matches from those approximate ones possible_matches_index = list(sindex.intersection(poly.bounds)) possible_matches = tessellation.iloc[possible_matches_index] precise_matches = possible_matches[possible_matches.intersects(poly)] to_cut = to_cut.append(precise_matches) # delete duplicates to_cut = to_cut.drop_duplicates(subset=[unique_id]) subselection = list(to_cut.index) print("Cutting...") for idx, row in tqdm( tessellation.loc[subselection].iterrows(), total=tessellation.loc[subselection].shape[0], ): intersection = row.geometry.intersection(limit) if intersection.type == "MultiPolygon": areas = {} for p, i in enumerate(intersection): area = intersection[p].area areas[p] = area maximal = max(areas.items(), key=operator.itemgetter(1))[0] tessellation.loc[idx, "geometry"] = intersection[maximal] elif intersection.type == "GeometryCollection": for geom in list(intersection.geoms): if geom.type != "Polygon": pass else: tessellation.loc[idx, "geometry"] = geom else: tessellation.loc[idx, "geometry"] = intersection return tessellation, sindex def _check_result(self, tesselation, orig_gdf, unique_id): """ Check whether result of tessellation matches buildings and contains only Polygons. """ # check against input layer ids_original = list(orig_gdf[unique_id]) ids_generated = list(tesselation[unique_id]) if len(ids_original) != len(ids_generated): import warnings diff = set(ids_original).difference(ids_generated) warnings.warn( "Tessellation does not fully match buildings. {len} element(s) collapsed " "during generation - unique_id: {i}".format(len=len(diff), i=diff) ) # check MultiPolygons - usually caused by error in input geometry uids = tesselation[tesselation.geometry.type == "MultiPolygon"][unique_id] if len(uids) > 0: import warnings warnings.warn( "Tessellation contains MultiPolygon elements. Initial objects should be edited. " "unique_id of affected elements: {}".format(list(uids)) )
[docs]class Blocks: """ Generate blocks based on buildings, tesselation and street network Captures bID to buildings and tesselation as attributes. Parameters ---------- tessellation : GeoDataFrame GeoDataFrame containing morphological tessellation edges : GeoDataFrame GeoDataFrame containing street network buildings : GeoDataFrame GeoDataFrame containing buildings id_name : str name of the unique blocks id column to be generated unique_id : str name of the column with unique id. If there is none, it could be generated by unique_id(). This should be the same for cells and buildings, id's should match. Attributes ---------- blocks : GeoDataFrame GeoDataFrame containing generated blocks buildings_id : Series Series derived from buildings with block ID tessellation_id : Series Series derived from morphological tessellation with block ID tessellation : GeoDataFrame GeoDataFrame containing original tessellation edges : GeoDataFrame GeoDataFrame containing original edges buildings : GeoDataFrame GeoDataFrame containing original buildings id_name : str name of the unique blocks id column unique_id : str name of the column with unique id Examples -------- >>> blocks_generate = mm.Blocks(tessellation_df, streets_df, buildings_df, 'bID', 'uID') Buffering streets... Generating spatial index... Difference... Defining adjacency... Defining street-based blocks... Defining block ID... Generating centroids... Spatial join... Attribute join (tesselation)... Generating blocks... Multipart to singlepart... Attribute join (buildings)... Attribute join (tesselation)... >>> blocks_generate.blocks.head() bID geometry 0 1.0 POLYGON ((1603560.078648818 6464202.366899694,... 1 2.0 POLYGON ((1603457.225976106 6464299.454696888,... 2 3.0 POLYGON ((1603056.595487018 6464093.903488506,... 3 4.0 POLYGON ((1603260.943782872 6464141.327631323,... 4 5.0 POLYGON ((1603183.399594798 6463966.109982309,... """
[docs] def __init__(self, tessellation, edges, buildings, id_name, unique_id): self.tessellation = tessellation self.edges = edges self.buildings = buildings self.id_name = id_name self.unique_id = unique_id if id_name in buildings.columns: raise ValueError( "'{}' column cannot be in the buildings GeoDataFrame".format(id_name) ) cells_copy = tessellation[[unique_id, "geometry"]].copy() print("Buffering streets...") street_buff = edges.copy() street_buff["geometry"] = street_buff.buffer(0.1) print("Generating spatial index...") streets_index = street_buff.sindex print("Difference...") new_geom = [] for ix, cell in tqdm( cells_copy.geometry.iteritems(), total=cells_copy.shape[0] ): possible_matches_index = list(streets_index.intersection(cell.bounds)) possible_matches = street_buff.iloc[possible_matches_index] new_geom.append(cell.difference(possible_matches.geometry.unary_union)) print("Defining adjacency...") blocks_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(new_geom)) blocks_gdf = blocks_gdf.explode().reset_index(drop=True) spatial_weights = libpysal.weights.Queen.from_dataframe( blocks_gdf, silence_warnings=True ) patches = {} jID = 1 for idx, row in tqdm(blocks_gdf.iterrows(), total=blocks_gdf.shape[0]): # if the id is already present in courtyards, continue (avoid repetition) if idx in patches: continue else: to_join = [idx] # list of indices which should be joined together neighbours = [] # list of neighbours weights = spatial_weights.neighbors[ idx ] # neighbours from spatial weights for w in weights: neighbours.append(w) # make a list from weigths for n in neighbours: while ( n not in to_join ): # until there is some neighbour which is not in to_join to_join.append(n) weights = spatial_weights.neighbors[n] for w in weights: neighbours.append( w ) # extend neighbours by neighbours of neighbours :) for b in to_join: patches[b] = jID # fill dict with values jID = jID + 1 blocks_gdf["patch"] = blocks_gdf.index.map(patches) print("Defining street-based blocks...") blocks_single = blocks_gdf.dissolve(by="patch") blocks_single.crs = buildings.crs blocks_single["geometry"] = blocks_single.buffer(0.1) print("Defining block ID...") # street based blocks_single[id_name] = range(len(blocks_single)) print("Generating centroids...") buildings_c = buildings.copy() buildings_c["geometry"] = buildings_c.representative_point() # make points print("Spatial join...") centroids_tempID = gpd.sjoin( buildings_c, blocks_single, how="left", op="intersects" ) tempID_to_uID = centroids_tempID[[unique_id, id_name]] print("Attribute join (tesselation)...") cells_copy = cells_copy.merge(tempID_to_uID, on=unique_id, how="left") print("Generating blocks...") blocks = cells_copy.dissolve(by=id_name) print("Multipart to singlepart...") blocks = blocks.explode() blocks.reset_index(inplace=True, drop=True) blocks["geometry"] = blocks.exterior blocks[id_name] = range(len(blocks)) blocks["geometry"] = blocks.apply(lambda row: Polygon(row.geometry), axis=1) # if polygon is within another one, delete it sindex = blocks.sindex for idx, row in tqdm(blocks.iterrows(), total=blocks.shape[0]): possible_matches = list(sindex.intersection(row.geometry.bounds)) possible_matches.remove(idx) possible = blocks.iloc[possible_matches] for idx2, row2 in possible.iterrows(): if row["geometry"].within(row2["geometry"]): blocks.loc[idx, "delete"] = 1 if "delete" in blocks.columns: blocks = blocks.drop(list(blocks.loc[blocks["delete"] == 1].index)) self.blocks = blocks[[id_name, "geometry"]] centroids_w_bl_ID2 = gpd.sjoin( buildings_c, self.blocks, how="left", op="intersects" ) bl_ID_to_uID = centroids_w_bl_ID2[[unique_id, id_name]] print("Attribute join (buildings)...") buildings_m = buildings[[unique_id]].merge( bl_ID_to_uID, on=unique_id, how="left" ) self.buildings_id = buildings_m[id_name] print("Attribute join (tesselation)...") cells_m = tessellation[[unique_id]].merge( bl_ID_to_uID, on=unique_id, how="left" ) self.tessellation_id = cells_m[id_name]
[docs]def get_network_id(left, right, network_id, min_size=100): """ Snap each element (preferably building) to the closest street network segment, saves its id. Adds network ID to elements. Parameters ---------- left : GeoDataFrame GeoDataFrame containing objects to snap right : GeoDataFrame GeoDataFrame containing street network with unique network ID. If there is none, it could be generated by :py:func:`momepy.elements.unique_id`. network_id : str, list, np.array, pd.Series (default None) the name of the streets dataframe column, np.array, or pd.Series with network unique id. min_size : int (default 100) min_size should be a vaule such that if you build a box centered in each building centroid with edges of size `2*min_size`, you know a priori that at least one segment is intersected with the box. Returns ------- elements_nID : Series Series containing network ID for elements Examples -------- >>> buildings_df['nID'] = momepy.get_network_id(buildings_df, streets_df, 'nID') Generating centroids... Generating rtree... Snapping: 100%|██████████| 144/144 [00:00<00:00, 2718.98it/s] >>> buildings_df['nID'][0] 1 """ INFTY = 1000000000000 MIN_SIZE = min_size # MIN_SIZE should be a vaule such that if you build a box centered in each # point with edges of size 2*MIN_SIZE, you know a priori that at least one # segment is intersected with the box. Otherwise, you could get an inexact # solution, there is an exception checking this, though. left = left.copy() right = right.copy() if not isinstance(network_id, str): right["mm_nid"] = network_id network_id = "mm_nid" print("Generating centroids...") buildings_c = left.copy() buildings_c["geometry"] = buildings_c.centroid # make centroids print("Generating rtree...") idx = right.sindex result = [] for ix, r in tqdm( buildings_c.iterrows(), total=buildings_c.shape[0], desc="Snapping" ): p = r.geometry pbox = (p.x - MIN_SIZE, p.y - MIN_SIZE, p.x + MIN_SIZE, p.y + MIN_SIZE) hits = list(idx.intersection(pbox)) d = INFTY nid = None for h in hits: new_d = p.distance(right.geometry.iloc[h]) if d >= new_d: d = new_d nid = right[network_id].iloc[h] if nid is None: result.append(np.nan) else: result.append(nid) series = pd.Series(result) if series.isnull().any(): import warnings warnings.warn( "Some objects were not attached to the network. " "Set larger min_size. {} affected elements".format(sum(series.isnull())) ) return series
[docs]def get_node_id(objects, nodes, edges, node_id, edge_id): """ Snap each building to closest street network node on the closest network edge. Adds node ID to objects (preferably buildings). Gets ID of edge (:py:func:`momepy.get_node_id`) , and determines which of its end points is closer to building centroid. Parameters ---------- objects : GeoDataFrame GeoDataFrame containing objects to snap nodes : GeoDataFrame GeoDataFrame containing street nodes with unique node ID. If there is none, it could be generated by :py:func:`momepy.unique_id`. edges : GeoDataFrame GeoDataFrame containing street edges with unique edge ID and IDs of start and end points of each segment. Start and endpoints are default outcome of :py:func:`momepy.nx_to_gdf`. node_id : str, list, np.array, pd.Series (default None) the name of the nodes dataframe column, np.array, or pd.Series with unique id Returns ------- node_ids : Series Series containing node ID for objects """ nodes = nodes.copy() if not isinstance(node_id, str): nodes["mm_noid"] = node_id node_id = "mm_noid" results_list = [] for index, row in tqdm(objects.iterrows(), total=objects.shape[0]): if np.isnan(row[edge_id]): results_list.append(np.nan) else: centroid = row.geometry.centroid edge = edges.loc[edges[edge_id] == row[edge_id]].iloc[0] startID = edge.node_start start = nodes.loc[nodes[node_id] == startID].iloc[0].geometry sd = centroid.distance(start) endID = edge.node_end end = nodes.loc[nodes[node_id] == endID].iloc[0].geometry ed = centroid.distance(end) if sd > ed: results_list.append(endID) else: results_list.append(startID) series = pd.Series(results_list, index=objects.index) return series
def _split_lines(polygon, distance): """Split polygon into GeoSeries of lines no longer than `distance`.""" list_points = [] current_dist = distance # set the current distance to place the point boundary = polygon.boundary # make shapely MultiLineString object if boundary.type == "LineString": line_length = boundary.length # get the total length of the line while ( current_dist < line_length ): # while the current cumulative distance is less than the total length of the line list_points.append( boundary.interpolate(current_dist) ) # use interpolate and increase the current distance current_dist += distance elif boundary.type == "MultiLineString": for ls in boundary: line_length = ls.length # get the total length of the line while ( current_dist < line_length ): # while the current cumulative distance is less than the total length of the line list_points.append( ls.interpolate(current_dist) ) # use interpolate and increase the current distance current_dist += distance cutted = shapely.ops.split( boundary, shapely.geometry.MultiPoint(list_points).buffer(0.001) ) return cutted